package com.xwder.nio.groupchat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
 * 群聊服务器端
 *
 * 1) 编写一个 NIO 群聊系统，实现服务器端和客户端之间的数据简单通讯（非阻塞） <p>
 * 2) 实现多人群聊  <p>
 * 3) 服务器端：可以监测用户上线，离线，并实现消息转发功能   <p>
 * 4) 客户端：通过 channel 可以无阻塞发送消息给其它所有用户，同时可以接受其它用户发送的消息(有服务器转发 得到)
 *
 * @author xwder
 * @date 2021/3/18 09:43
 **/
public class GroupChatServer {

    private static final Logger logger = LoggerFactory.getLogger(GroupChatServer.class);

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private static final int PORT = 6666;

    /**
     * 初始化工作
     */
    public GroupChatServer() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 设置非阻塞模式
            serverSocketChannel.configureBlocking(false);
            // 绑定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            // 将serverSocketChannel注册到selector 接收accept事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            logger.info("初始化selector,serverSocketChannel成功~~");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("初始化selector,serverSocketChannel发送错误", e);
        }
    }

    /**
     * 监听客户端连接
     */
    public void listen() {
        while (true) {
            try {
                // 不阻塞 1秒超时返回
                int selectCount = selector.select(1000);
                if (selectCount == 0) {
                    // 没有获取到需要处理的事件
                    logger.info("服务器等待客户端处理事件~~");
                    continue;
                }
                // // 如果返回的结果>0,表示已经获取到客户端的事件 通过selectionKey集合可以获取到关注到的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历获取到的事件
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectionKey = keyIterator.next();
                    // OP_ACCEPT 客户端新的连接事件
                    if (selectionKey.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 设置非阻塞
                        socketChannel.configureBlocking(false);
                        // 将给socketChannel注册到Selector并关注事件 OP_READ
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        logger.info("客户端{}上线~~", socketChannel.getRemoteAddress().toString().substring(1));
                    }

                    // OP_READ 客户端发送数据事件
                    if (selectionKey.isReadable()) {
                        // 读取客户端发送的数据
                        readClientSendData(selectionKey);
                    }
                    // 处理时间过后移除 避免重复处理
                    keyIterator.remove();
                }
            } catch (IOException e) {
                logger.error("服务器端监听客户端发送错误", e);
            }
        }
    }

    /**
     * 读取客户端发送的消息并转发到其他客户端
     *
     * @param selectionKey
     */
    public void readClientSendData(SelectionKey selectionKey) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int read = channel.read(byteBuffer);
            if (read > 0) {
                String msg = new String(byteBuffer.array(), StandardCharsets.UTF_8);
                logger.info("服务器接收客户端:{}消息成功，消息长度:{},消息内容:{}", channel.getRemoteAddress().toString().substring(1), read, msg);
                // 向其他客户端转发消息
                sendInfoToOtherClients(msg, channel);
            }
        } catch (IOException e) {
            try {
                logger.error("客户端:{}已下线断开连接", channel.getRemoteAddress().toString().substring(1));
                // 取消注册
                selectionKey.cancel();
                // 关闭客户端channel通道
                channel.close();
            } catch (IOException ioException) {
                logger.info("客户端断开连接发送错误", ioException);
            }
        }


    }

    /**
     * 转发消息到其他客户端，跳过自己
     *
     * @param msg
     * @param selfChannel
     */
    public void sendInfoToOtherClients(String msg, Channel selfChannel) {
        Set<SelectionKey> selectionKeySet = selector.keys();
        selectionKeySet.forEach(selectionKey -> {
            // 通过 key  取出对应的 SocketChannel
            Channel targetChannel = selectionKey.channel();
            // 排除掉自己
            if (targetChannel instanceof SocketChannel && targetChannel != selfChannel) {
                // 转型
                SocketChannel socketChannel = (SocketChannel) targetChannel;

                // 将消息存储到buffer
                ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                // 消息写入channel
                try {
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    logger.error("服务器端转发消息失败", e);
                }
            }
        });
    }

    public static void main(String[] args) {
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}
